1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.net.client.clientmanger; 12 13 import std.socket; 14 15 import kiss.exception; 16 import kiss.event; 17 import kiss.util.timer; 18 import kiss.net.TcpStream; 19 import collie.net.client.linklogInfo; 20 import collie.net.client.exception; 21 22 import kiss.event.timer.common; 23 import kiss.event.task; 24 import kiss.util.functional; 25 26 final class TCPClientManger 27 { 28 alias ClientCreatorCallBack = void delegate(TcpStream); 29 alias ConCallBack = void delegate(ClientConnection); 30 alias LinklogInfo = TLinklogInfo!ConCallBack; 31 alias NewConnection = ClientConnection delegate(TcpStream); 32 33 this(EventLoop loop) 34 { 35 _loop = loop; 36 } 37 38 void setClientCreatorCallBack(ClientCreatorCallBack cback) 39 { 40 _oncreator = cback; 41 } 42 43 void setNewConnectionCallBack(NewConnection cback) 44 { 45 _cback = cback; 46 } 47 48 @property eventLoop(){return _loop;} 49 @property timeout(){return _timeout;} 50 @property tryCout(){return _tryCout;} 51 @property tryCout(uint count){_tryCout = count;} 52 53 void startTimeout(uint s) 54 { 55 if(_wheel !is null) 56 throw new SocketClientException("TimeOut is runing!"); 57 _timeout = s; 58 if(_timeout == 0 || _timer) 59 return; 60 61 uint whileSize;uint time; 62 enum int[] fvka = [40,120,600,1000,uint.max]; 63 enum int[] fvkb = [50,60,100,150,300]; 64 foreach(i ; 0..fvka.length ){ 65 if(s <= fvka[i]){ 66 whileSize = fvkb[i]; 67 time = _timeout * 1000 / whileSize; 68 break; 69 } 70 } 71 72 _wheel = new TimingWheel(whileSize); 73 _timer = new KissTimer(_loop, time); 74 _timer.onTick(&onTimer); 75 if(_loop.isInLoopThread()){ 76 _timer.start(); 77 } else { 78 _loop.postTask(newTask(&_timer.start, false, false)); 79 } 80 } 81 82 void connect(Address addr,ConCallBack cback = null) 83 { 84 if(_cback is null) 85 throw new SocketClientException("must set NewConnection callback "); 86 LinklogInfo * logInfo = new LinklogInfo(); 87 logInfo.addr = addr; 88 logInfo.tryCount = 0; 89 logInfo.cback = cback; 90 if(_loop.isInLoopThread()){ 91 _postConmnect(logInfo); 92 } else { 93 _loop.postTask(newTask(&_postConmnect,logInfo)); 94 } 95 } 96 97 void stopTimer(){ 98 if(_timer) { 99 _timer.stop(); 100 _timer = null; 101 } 102 } 103 104 protected: 105 void connect(LinklogInfo * logInfo) 106 { 107 logInfo.client = new TcpStream(_loop); 108 if(_oncreator) 109 _oncreator(logInfo.client); 110 logInfo.client.onClosed(&tmpCloseCallBack); 111 logInfo.client.onConnected(bind(&connectCallBack,logInfo)); 112 // logInfo.client.setReadHandle(&tmpReadCallBack); 113 logInfo.client.connect(logInfo.addr); 114 } 115 116 void tmpReadCallBack(in ubyte[]) nothrow {} 117 void tmpCloseCallBack() {} 118 119 void connectCallBack(LinklogInfo * logInfo,bool state) 120 { 121 catchAndLogException((){ 122 import std.exception; 123 if(logInfo is null)return; 124 if(state) { 125 scope(exit){ 126 _waitConnect.rmlogInfo(logInfo); 127 } 128 ClientConnection con; 129 collectException(_cback(logInfo.client),con); 130 if(logInfo.cback) 131 logInfo.cback(con); 132 if(con is null) return; 133 if(_wheel) 134 _wheel.addNewTimer(con); 135 con.onActive(); 136 } else { 137 logInfo.client = null; 138 if(logInfo.tryCount < _tryCout) { 139 logInfo.tryCount ++; 140 connect(logInfo); 141 } else { 142 auto cback = logInfo.cback; 143 _waitConnect.rmlogInfo(logInfo); 144 if(cback) 145 cback(null); 146 } 147 } 148 }()); 149 } 150 151 void onTimer(Object ){ 152 _wheel.prevWheel(); 153 } 154 155 private: 156 final void _postConmnect(LinklogInfo * logInfo){ 157 _waitConnect.addlogInfo(logInfo); 158 connect(logInfo); 159 } 160 private: 161 uint _tryCout = 1; 162 uint _timeout; 163 164 EventLoop _loop; 165 KissTimer _timer; 166 TimingWheel _wheel; 167 TLinkManger!ConCallBack _waitConnect; 168 169 NewConnection _cback; 170 ClientCreatorCallBack _oncreator; 171 } 172 173 @trusted abstract class ClientConnection : WheelTimer 174 { 175 this(TcpStream client) 176 { 177 resetClient(client); 178 } 179 180 final bool isAlive() @trusted { 181 return _client && _client.isRegistered; 182 } 183 184 final @property tcpClient()@safe {return _client;} 185 186 alias restClient = resetClient; 187 188 final void resetClient(TcpStream client) @trusted 189 { 190 if(_client !is null){ 191 _client.onClosed(null); 192 _client.onDataReceived(null); 193 _client.onConnected(null); 194 _client = null; 195 } 196 if(client !is null){ 197 _client = client; 198 _loop = cast(EventLoop) client.eventLoop; 199 _client.onClosed(&doClose); 200 _client.onDataReceived(&onRead); 201 _client.onConnected(&tmpConnectCallBack); 202 } 203 } 204 205 final void write(in ubyte[] data, DataWrittenHandler cback = null) @trusted 206 { 207 write(new SocketStreamBuffer(data,cback)); 208 } 209 210 final void write(StreamWriteBuffer buffer) @trusted 211 { 212 if (_loop.isInLoopThread()) { 213 _postWriteBuffer(buffer); 214 } else { 215 _loop.postTask(newTask(&_postWriteBuffer, buffer)); 216 } 217 } 218 219 final void restTimeout() @trusted 220 { 221 if(_loop.isInLoopThread()){ 222 rest(); 223 } else { 224 _loop.postTask(newTask(&rest,0)); 225 } 226 } 227 228 pragma(inline) 229 final void close() @trusted 230 { 231 _loop.postTask(newTask(&_postClose)); 232 } 233 protected: 234 void onActive() nothrow; 235 void onClose() nothrow; 236 void onRead(in ubyte[] data) nothrow; 237 private: 238 final void tmpConnectCallBack(bool) nothrow{} 239 final void doClose() @trusted nothrow 240 { 241 catchAndLogException((){ 242 stop(); 243 onClose(); 244 }()); 245 } 246 247 final void _postClose(){ 248 if(_client) 249 _client.close(); 250 } 251 252 final void _postWriteBuffer(StreamWriteBuffer buffer) 253 { 254 if (_client) { 255 rest(); 256 _client.write(buffer); 257 } else 258 buffer.doFinish(); 259 } 260 261 private: 262 TcpStream _client; 263 EventLoop _loop; 264 }